/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2022, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */

static int prod_msg_remains = 0;
static int fails            = 0;

/**
 * Delivery reported callback.
 * Called for each message once to signal its delivery status.
 */
static void dr_cb(rd_kafka_t *rk,
                  void *payload,
                  size_t len,
                  rd_kafka_resp_err_t err,
                  void *opaque,
                  void *msg_opaque) {

        if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
                TEST_FAIL("Message delivery failed: %s\n",
                          rd_kafka_err2str(err));

        if (prod_msg_remains == 0)
                TEST_FAIL("Too many messages delivered (prod_msg_remains %i)",
                          prod_msg_remains);

        prod_msg_remains--;
}


/**
 * Produces 'msgcnt' messages split over 'partition_cnt' partitions.
 */
static void produce_messages(uint64_t testid,
                             const char *topic,
                             int partition_cnt,
                             int msg_base,
                             int msgcnt) {
        int r;
        rd_kafka_t *rk;
        rd_kafka_topic_t *rkt;
        rd_kafka_conf_t *conf;
        rd_kafka_topic_conf_t *topic_conf;
        char errstr[512];
        int i;
        int32_t partition;
        int msgid = msg_base;

        test_conf_init(&conf, &topic_conf, 20);

        rd_kafka_conf_set_dr_cb(conf, dr_cb);

        /* Make sure all replicas are in-sync after producing
         * so that consume test wont fail. */
        rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
                                errstr, sizeof(errstr));

        /* Create kafka instance */
        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

        rkt = rd_kafka_topic_new(rk, topic, topic_conf);
        if (!rkt)
                TEST_FAIL("Failed to create topic: %s\n",
                          rd_kafka_err2str(rd_kafka_last_error()));

        /* Produce messages */
        prod_msg_remains = msgcnt;
        for (partition = 0; partition < partition_cnt; partition++) {
                int batch_cnt = msgcnt / partition_cnt;

                for (i = 0; i < batch_cnt; i++) {
                        char key[128];
                        char buf[128];
                        rd_snprintf(key, sizeof(key),
                                    "testid=%" PRIu64 ", partition=%i, msg=%i",
                                    testid, (int)partition, msgid);
                        rd_snprintf(buf, sizeof(buf),
                                    "data: testid=%" PRIu64
                                    ", partition=%i, msg=%i",
                                    testid, (int)partition, msgid);

                        r = rd_kafka_produce(
                            rkt, partition, RD_KAFKA_MSG_F_COPY, buf,
                            strlen(buf), key, strlen(key), NULL);
                        if (r == -1)
                                TEST_FAIL(
                                    "Failed to produce message %i "
                                    "to partition %i: %s",
                                    msgid, (int)partition,
                                    rd_kafka_err2str(rd_kafka_last_error()));
                        msgid++;
                }
        }


        /* Wait for messages to be delivered */
        while (rd_kafka_outq_len(rk) > 0)
                rd_kafka_poll(rk, 100);

        if (fails)
                TEST_FAIL("%i failures, see previous errors", fails);

        if (prod_msg_remains != 0)
                TEST_FAIL("Still waiting for %i messages to be produced",
                          prod_msg_remains);

        /* Destroy topic */
        rd_kafka_topic_destroy(rkt);

        /* Destroy rdkafka instance */
        TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
        rd_kafka_destroy(rk);
}



static int *cons_msgs;
static int cons_msgs_size;
static int cons_msgs_cnt;
static int cons_msg_next;
static int cons_msg_stop        = -1;
static int64_t cons_last_offset = -1; /* last offset received */

static void verify_consumed_msg_reset(int msgcnt) {
        if (cons_msgs) {
                free(cons_msgs);
                cons_msgs = NULL;
        }

        if (msgcnt) {
                int i;

                cons_msgs = malloc(sizeof(*cons_msgs) * msgcnt);
                for (i = 0; i < msgcnt; i++)
                        cons_msgs[i] = -1;
        }

        cons_msgs_size   = msgcnt;
        cons_msgs_cnt    = 0;
        cons_msg_next    = 0;
        cons_msg_stop    = -1;
        cons_last_offset = -1;

        TEST_SAY("Reset consumed_msg stats, making room for %d new messages\n",
                 msgcnt);
}


static int int_cmp(const void *_a, const void *_b) {
        int a = *(int *)_a;
        int b = *(int *)_b;
        /* Sort -1 (non-received msgs) at the end */
        return (a == -1 ? 100000000 : a) - (b == -1 ? 10000000 : b);
}

static void verify_consumed_msg_check0(const char *func,
                                       int line,
                                       const char *desc,
                                       int expected_cnt) {
        int i;
        int fails     = 0;
        int not_recvd = 0;

        TEST_SAY("%s: received %d/%d/%d messages\n", desc, cons_msgs_cnt,
                 expected_cnt, cons_msgs_size);
        if (expected_cnt > cons_msgs_size)
                TEST_FAIL("expected_cnt %d > cons_msgs_size %d\n", expected_cnt,
                          cons_msgs_size);

        if (cons_msgs_cnt < expected_cnt) {
                TEST_SAY("%s: Missing %i messages in consumer\n", desc,
                         expected_cnt - cons_msgs_cnt);
                fails++;
        }

        qsort(cons_msgs, cons_msgs_size, sizeof(*cons_msgs), int_cmp);

        for (i = 0; i < expected_cnt; i++) {
                if (cons_msgs[i] != i) {
                        if (cons_msgs[i] == -1) {
                                not_recvd++;
                                TEST_SAY("%s: msg %d/%d not received\n", desc,
                                         i, expected_cnt);
                        } else
                                TEST_SAY(
                                    "%s: Consumed message #%i is wrong, "
                                    "expected #%i\n",
                                    desc, cons_msgs[i], i);
                        fails++;
                }
        }

        if (not_recvd)
                TEST_SAY("%s: %d messages not received at all\n", desc,
                         not_recvd);

        if (fails)
                TEST_FAIL("%s: See above error(s)", desc);
        else
                TEST_SAY(
                    "%s: message range check: %d/%d messages consumed: "
                    "succeeded\n",
                    desc, cons_msgs_cnt, expected_cnt);
}


#define verify_consumed_msg_check(desc, expected_cnt)                          \
        verify_consumed_msg_check0(__FUNCTION__, __LINE__, desc, expected_cnt)



static void verify_consumed_msg0(const char *func,
                                 int line,
                                 uint64_t testid,
                                 int32_t partition,
                                 int msgnum,
                                 rd_kafka_message_t *rkmessage) {
        uint64_t in_testid;
        int in_part;
        int in_msgnum;
        char buf[128];

        if (rkmessage->key_len + 1 >= sizeof(buf))
                TEST_FAIL(
                    "Incoming message key too large (%i): "
                    "not sourced by this test",
                    (int)rkmessage->key_len);

        rd_snprintf(buf, sizeof(buf), "%.*s", (int)rkmessage->key_len,
                    (char *)rkmessage->key);

        if (sscanf(buf, "testid=%" SCNu64 ", partition=%i, msg=%i", &in_testid,
                   &in_part, &in_msgnum) != 3)
                TEST_FAIL("Incorrect key format: %s", buf);

        if (test_level > 2) {
                TEST_SAY("%s:%i: Our testid %" PRIu64
                         ", part %i (%i), "
                         "msg %i/%i, key's: \"%s\"\n",
                         func, line, testid, (int)partition,
                         (int)rkmessage->partition, msgnum, cons_msgs_size,
                         buf);
        }

        if (testid != in_testid || (partition != -1 && partition != in_part) ||
            (msgnum != -1 && msgnum != in_msgnum) ||
            (in_msgnum < 0 || in_msgnum > cons_msgs_size))
                goto fail_match;

        if (cons_msgs_cnt == cons_msgs_size) {
                TEST_SAY(
                    "Too many messages in cons_msgs (%i) while reading "
                    "message key \"%s\"\n",
                    cons_msgs_cnt, buf);
                verify_consumed_msg_check("?", cons_msgs_size);
                TEST_FAIL("See above error(s)");
        }

        cons_msgs[cons_msgs_cnt++] = in_msgnum;
        cons_last_offset           = rkmessage->offset;

        return;

fail_match:
        TEST_FAIL("%s:%i: Our testid %" PRIu64
                  ", part %i, msg %i/%i did "
                  "not match message's key: \"%s\"\n",
                  func, line, testid, (int)partition, msgnum, cons_msgs_size,
                  buf);
}

#define verify_consumed_msg(testid, part, msgnum, rkmessage)                   \
        verify_consumed_msg0(__FUNCTION__, __LINE__, testid, part, msgnum,     \
                             rkmessage)


static void consume_cb(rd_kafka_message_t *rkmessage, void *opaque) {
        int64_t testid = *(int64_t *)opaque;

        if (test_level > 2)
                TEST_SAY("Consumed message #%d? at offset %" PRId64 ": %s\n",
                         cons_msg_next, rkmessage->offset,
                         rd_kafka_err2str(rkmessage->err));

        if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                TEST_SAY("EOF at offset %" PRId64 "\n", rkmessage->offset);
                return;
        }

        if (rkmessage->err)
                TEST_FAIL(
                    "Consume message from partition %i "
                    "has error: %s",
                    (int)rkmessage->partition,
                    rd_kafka_err2str(rkmessage->err));

        verify_consumed_msg(testid, rkmessage->partition, cons_msg_next,
                            rkmessage);

        if (cons_msg_next == cons_msg_stop) {
                rd_kafka_yield(NULL /*FIXME*/);
        }

        cons_msg_next++;
}

static void consume_messages_callback_multi(const char *desc,
                                            uint64_t testid,
                                            const char *topic,
                                            int32_t partition,
                                            const char *offset_store_method,
                                            int msg_base,
                                            int msg_cnt,
                                            int64_t initial_offset,
                                            int iterations) {
        rd_kafka_t *rk;
        rd_kafka_topic_t *rkt;
        rd_kafka_conf_t *conf;
        rd_kafka_topic_conf_t *topic_conf;
        int i;

        TEST_SAY("%s: Consume messages %d+%d from %s [%" PRId32
                 "] "
                 "from offset %" PRId64 " in %d iterations\n",
                 desc, msg_base, msg_cnt, topic, partition, initial_offset,
                 iterations);

        test_conf_init(&conf, &topic_conf, 20);

        test_topic_conf_set(topic_conf, "offset.store.method",
                            offset_store_method);

        if (!strcmp(offset_store_method, "broker")) {
                /* Broker based offset storage requires a group.id */
                test_conf_set(conf, "group.id", topic);
        }

        test_conf_set(conf, "enable.partition.eof", "true");

        /* Create kafka instance */
        rk = test_create_handle(RD_KAFKA_CONSUMER, conf);

        rd_kafka_topic_conf_set(topic_conf, "auto.offset.reset", "smallest",
                                NULL, 0);

        rkt = rd_kafka_topic_new(rk, topic, topic_conf);
        if (!rkt)
                TEST_FAIL("%s: Failed to create topic: %s\n", desc,
                          rd_kafka_err2str(rd_kafka_last_error()));

        cons_msg_stop = cons_msg_next + msg_cnt - 1;

        /* Consume the same batch of messages multiple times to
         * make sure back-to-back start&stops work. */
        for (i = 0; i < iterations; i++) {
                int cnta;
                test_timing_t t_stop;

                TEST_SAY(
                    "%s: Iteration #%i: Consuming from "
                    "partition %i at offset %" PRId64
                    ", "
                    "msgs range %d..%d\n",
                    desc, i, partition, initial_offset, cons_msg_next,
                    cons_msg_stop);

                /* Consume messages */
                if (rd_kafka_consume_start(rkt, partition, initial_offset) ==
                    -1)
                        TEST_FAIL("%s: consume_start(%i) failed: %s", desc,
                                  (int)partition,
                                  rd_kafka_err2str(rd_kafka_last_error()));


                /* Stop consuming messages when this number of messages
                 * is reached. */
                cnta = cons_msg_next;
                do {
                        rd_kafka_consume_callback(rkt, partition, 1000,
                                                  consume_cb, &testid);
                } while (cons_msg_next < cons_msg_stop);

                TEST_SAY("%s: Iteration #%i: consumed %i messages\n", desc, i,
                         cons_msg_next - cnta);

                TIMING_START(&t_stop, "rd_kafka_consume_stop()");
                rd_kafka_consume_stop(rkt, partition);
                TIMING_STOP(&t_stop);

                /* Advance next offset so we dont reconsume
                 * messages on the next run. */
                if (initial_offset != RD_KAFKA_OFFSET_STORED) {
                        initial_offset = cons_last_offset + 1;
                        cons_msg_stop  = cons_msg_next + msg_cnt - 1;
                }
        }

        /* Destroy topic */
        rd_kafka_topic_destroy(rkt);

        /* Destroy rdkafka instance */
        TEST_SAY("%s: Destroying kafka instance %s\n", desc, rd_kafka_name(rk));
        rd_kafka_destroy(rk);
}



static void test_produce_consume(const char *offset_store_method) {
        int msgcnt        = 100;
        int partition_cnt = 1;
        int i;
        uint64_t testid;
        int msg_base = 0;
        const char *topic;

        /* Generate a testid so we can differentiate messages
         * from other tests */
        testid = test_id_generate();

        /* Read test.conf to configure topic name */
        test_conf_init(NULL, NULL, 20);
        topic = test_mk_topic_name("0014", 1 /*random*/);

        TEST_SAY("Topic %s, testid %" PRIu64 ", offset.store.method=%s\n",
                 topic, testid, offset_store_method);

        /* Produce messages */
        produce_messages(testid, topic, partition_cnt, msg_base, msgcnt);

        /* 100% of messages */
        verify_consumed_msg_reset(msgcnt);

        /* Consume 50% of messages with callbacks: stored offsets with no prior
         * offset stored. */
        for (i = 0; i < partition_cnt; i++)
                consume_messages_callback_multi("STORED.1/2", testid, topic, i,
                                                offset_store_method, msg_base,
                                                (msgcnt / partition_cnt) / 2,
                                                RD_KAFKA_OFFSET_STORED, 1);
        verify_consumed_msg_check("STORED.1/2", msgcnt / 2);

        /* Consume the rest using the now stored offset */
        for (i = 0; i < partition_cnt; i++)
                consume_messages_callback_multi("STORED.2/2", testid, topic, i,
                                                offset_store_method, msg_base,
                                                (msgcnt / partition_cnt) / 2,
                                                RD_KAFKA_OFFSET_STORED, 1);
        verify_consumed_msg_check("STORED.2/2", msgcnt);


        /* Consume messages with callbacks: logical offsets */
        verify_consumed_msg_reset(msgcnt);
        for (i = 0; i < partition_cnt; i++) {
                int p_msg_cnt          = msgcnt / partition_cnt;
                int64_t initial_offset = RD_KAFKA_OFFSET_TAIL(p_msg_cnt);
                const int iterations   = 4;
                consume_messages_callback_multi("TAIL+", testid, topic, i,
                                                offset_store_method,
                                                /* start here (msgid) */
                                                msg_base,
                                                /* consume this many messages
                                                 * per iteration. */
                                                p_msg_cnt / iterations,
                                                /* start here (offset) */
                                                initial_offset, iterations);
        }

        verify_consumed_msg_check("TAIL+", msgcnt);

        verify_consumed_msg_reset(0);

        return;
}



int main_0014_reconsume_191(int argc, char **argv) {
        if (test_broker_version >= TEST_BRKVER(0, 8, 2, 0))
                test_produce_consume("broker");
        test_produce_consume("file");
        return 0;
}
